S3イベントをトリガーにECS on Fargateでタスクを実行する
S3イベントをトリガーに何かしらの処理を実行する際にはLambdaを使うのが便利ですが、処理時間やメモリなどの制約でLambdaが使えない場合にはECSやGlueといったサービスの利用を検討する必要があります。今回はその中でもECSを選び、S3イベントを元にECS on Fargateでタスクを実行させる場合について試してみました。 S3イベントをトリガーとして実行されるLambdaからECS on Fargateのタスクを実行するという流れです。
やってみる
S3にオブジェクトをPUTしたら、別の場所にオブジェクトをコピーするという処理を試してみます。この程度の軽い処理であればLambdaで良さそうですが、今回は試すことが目的なのでECSを使います。 ECRやECSの設定手順に関する説明は当エントリでは一部省略しています。以下のエントリが詳しいので、そちらをご覧ください。
ECRにコンテナイメージをプッシュする
指定したS3オブジェクトをコピーするPythonスクリプトを実行するコンテナイメージを作成し、ECRのリポジトリへプッシュします。
ECSからコンテナイメージを参照できるようにするために、ECRやDocker Hubなどにコンテナイメージをプッシュする必要があります。今回はECRを使うため、まずはマネジメントコンソールからECRでリポジトリを作成します。
ECRのリポジトリが作成できたら、ローカルでコンテナイメージをビルドします。 コンテナイメージを構成するDockerfileは次の通りです。
FROM python:3 WORKDIR /usr/src/app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY script.py ./ CMD [ "python", "./script.py" ]
Pythonスクリプトは以下のような内容です。
import boto3 import os from os import path EVENT_BUCKET = os.environ["EVENT_BUCKET"] EVENT_OBJECKT_KEY = os.environ["EVENT_OBJECKT_KEY"] DESTINATION_BUCKET = os.environ["DESTINATION_BUCKET"] DESTINATION_OBJECKT_DIR = os.environ["DESTINATION_OBJECKT_DIR"] s3 = boto3.resource("s3") destination_object_key = path.join( DESTINATION_OBJECKT_DIR, path.basename(EVENT_OBJECKT_KEY) ) event_object = s3.Object(DESTINATION_BUCKET, destination_object_key) event_object.copy({"Bucket": EVENT_BUCKET, "Key": EVENT_OBJECKT_KEY})
Pythonスクリプトの依存パッケージを記述したrequirements.txt
は次の通りです。
boto3==1.13.4
コンテナイメージをビルドし、ECRのリポジトリへプッシュします。
aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin {アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com docker build -t python-worker . docker tag python-worker:latest {アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com/python-worker:latest docker push {アカウントID}.dkr.ecr.ap-northeast-1.amazonaws.com/python-worker:latest
ECSのタスク定義を作成する
ECSでタスク定義を作成します。Fargate互換を指定し、コンテナには先ほどプッシュしたイメージを設定します。今回の処理は簡単なものなので、基本的に全てデフォルト設定のまま利用し、CPUとメモリも最小のものを指定しています。
タスクの実行ロールを作成
ECSでのタスク定義の中でタスクの実行ロールを指定する必要があります。今回はS3のフルアクセス権限を持つ、次のようなIAMロールを作成しました。
ECSクラスタを作成する
Fargate用のテンプレートを元にECSクラスタを作成します。
設定はデフォルトのものをそのまま利用します。
Lambda関数を作成する
ECS側の設定が終わったので、次はLambda関数を作成します。
ランタイムにはPython 3.8
を指定します。IAMロールは自動作成されるものを利用し、あとで必要な権限を追加します。
Lambda関数に次のスクリプトを設定します。 このスクリプは渡されたS3イベントを元にECSのタスクを実行するという内容です。S3イベントに含まれるバケットやキーなどの情報は環境変数として設定し、ECSタスクに渡しています。
import boto3 import os ecs_client = boto3.client("ecs") ECS_CLUSTER = os.environ["ECS_CLUSTER"] TASK_DEFINITION = os.environ["TASK_DEFINITION"] DESTINATION_BUCKET = os.environ["DESTINATION_BUCKET"] DESTINATION_OBJECKT_DIR = os.environ["DESTINATION_OBJECKT_DIR"] SUBNET_ID_1 = os.environ["SUBNET_ID_1"] def lambda_handler(event, context): s3_event = event["Records"][0]["s3"] event_bucket = s3_event["bucket"]["name"] event_object_key = s3_event["object"]["key"] ecs_client.run_task( cluster=ECS_CLUSTER, launchType="FARGATE", networkConfiguration={ "awsvpcConfiguration": { "subnets": [SUBNET_ID_1], "assignPublicIp": "ENABLED", } }, overrides={ "containerOverrides": [ { "name": "python-worker", "environment": [ {"name": "EVENT_BUCKET", "value": event_bucket}, {"name": "EVENT_OBJECKT_KEY", "value": event_object_key}, {"name": "DESTINATION_BUCKET", "value": DESTINATION_BUCKET}, {"name": "DESTINATION_OBJECKT_DIR", "value": DESTINATION_OBJECKT_DIR}, ], }, ], }, taskDefinition=TASK_DEFINITION, )
環境変数を次のように設定します。
Lambda関数の実行ロールに権限を追加する
Lambda関数作成時に自動的に作成されたIAMロールだといくつか権限が不足しているので、インラインポリシーで権限を追加します。
Permissions
タブから実行ロールの設定画面に移動します。
インラインポリシーの追加画面に移動します。
以下の権限をそれぞれ追加します。
- ecs:RunTask
- iam:PassRole
トリガーを設定する
特定のパス配下にオブジェクトが生成されたら、Lambda関数が実行されるように、次のようなトリガーを設定します。
動作を確認する
Lambda関数の各種設定が終わり、設定内容を保存したら、S3にオブジェクトを保存し動作を確認してみます。
空ファイルを作成し、S3に保存します。
touch testobject aws s3 cp testobject s3://{バケット名}/event-trigger-test/source/
2分くらい経つと、ECS on Fargateでのタスク実行が終わり、指定した場所にファイルが保存されています。
S3イベント -> Lambda関数 -> ECS on Fargateという流れで無事動き、S3オブジェクトがコピーされたことを確認できました。
さいごに
ECS on FargateでS3イベントに基づく処理を実行させる流れを試してみました。試している途中にEventBridge経由でECSタスクを直接起動する方法も見つけました。この方法だと、ECSタスクを起動するLambda関数を挟む必要がなく、今回試した方法より楽そうなのでまた試してみたいと思います。